#include "netlib/net/event/event_loop.h"

#include <sys/eventfd.h>
#include <sys/poll.h>
#include <unistd.h>

#include <cassert>

#include "netlib/base/logger.h"
#include "netlib/base/time_stamp.h"
#include "netlib/net/event/channel.h"
#include "netlib/net/poller/poller.h"
#include "netlib/net/timer/timer_queue.h"

// #define WAKE_UP 0x

namespace {
__thread netlib::net::EventLoop* t_loop_int_this_thread = nullptr;

int CreateEventfd() {
	int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
	if (evtfd < 0) {
		LOG_SYSERR << "Failed in eventfd";
		abort();
	}
	return evtfd;
}
} // namespace

const int netlib::net::EventLoop::kPollTimeout = 5;
const int64_t netlib::net::EventLoop::kWakeupMsg = 1;

netlib::net::EventLoop::EventLoop(bool is_epoll) :
    kThreadId(current_thread::Tid()), poller_(Poller::GetPoller(this, is_epoll)),
    timer_queue_(new TimerQueue(this)), wakeup_channel_(new Channel(this, CreateEventfd())) {
	LOG_TRACE << "EventLoop created " << this << " in thread " << kThreadId;

	if (t_loop_int_this_thread) {
		LOG_FATAL << "Another EventLoop " << t_loop_int_this_thread << " exists in this thread "
		          << kThreadId;
	} else {
		t_loop_int_this_thread = this;
	}
}

netlib::net::EventLoop::~EventLoop() {
	assert(!is_looping_);
	LOG_DEBUG << "EventLoop " << this << " of thread " << kThreadId << " destructs in thread "
	          << current_thread::Tid();
	wakeup_channel_->DisableAll();
	wakeup_channel_->Remove();
	close(wakeup_channel_->Fd());
	// delete wakeup_channel_;
	delete timer_queue_;
	wakeup_channel_ = nullptr;
	timer_queue_ = nullptr;
	t_loop_int_this_thread = nullptr;
}

netlib::net::EventLoop* netlib::net::EventLoop::GetEventLoopOfCurrentThread() {
	return t_loop_int_this_thread;
}

void netlib::net::EventLoop::RemoveChannel(Channel* channel) {
	assert(channel->OwnerLoop() == this);
	AssertInLoopThread();
	if (is_handling_event_) {
		assert(std::find(activate_channels_.begin(), activate_channels_.end(), channel) ==
		       activate_channels_.end());
	}
	poller_->RemoveChannel(channel);
}

void netlib::net::EventLoop::UpdateChannel(Channel* channel) {
	assert(channel->OwnerLoop() == this);
	AssertInLoopThread();
	poller_->UpdateChannel(channel);
}

void netlib::net::EventLoop::RunInLoop(const Functor& functor) {
	if (IsInLoopThread()) {
		functor();
	} else {
		QueueInLoop(functor);
	}
}

netlib::net::TimerIndex netlib::net::EventLoop::RunAt(const Timestamp& time,
                                                      const TimerCallback& cb) {
	return timer_queue_->AddTimer(cb, time, 0.0);
}

netlib::net::TimerIndex netlib::net::EventLoop::RunAfter(double delay, const TimerCallback& cb) {
	Timestamp time(AddTime(Timestamp::Now(), delay));
	return RunAt(time, cb);
}

netlib::net::TimerIndex netlib::net::EventLoop::RunPeriodically(double interval,
                                                                const TimerCallback& cb) {
	Timestamp time(AddTime(Timestamp::Now(), interval));
	return timer_queue_->AddTimer(cb, time, interval);
}

void netlib::net::EventLoop::Cancel(TimerIndex timeridx) { timer_queue_->Cancel(timeridx); }

void netlib::net::EventLoop::AbortNotInLoopThread() {}

void netlib::net::EventLoop::Loop() {
	assert(!is_looping_);
	AssertInLoopThread();
	is_looping_ = true;
	while (!is_quit_) {
		activate_channels_.clear();
		Timestamp time = poller_->Poll(kPollTimeout, &activate_channels_);

		for (Channel* channel : activate_channels_) {
			channel->HandleEvent(time);
		}
		DoPendingFunctors();
	}
	LOG_TRACE << "EventLoop " << this << " stop looping";
	is_looping_ = false;
}

void netlib::net::EventLoop::QueueInLoop(const Functor& functor) {
	{
		MutexLockGuard lock(mutex_);
		pending_functors_.push_back(functor);
	}
	if (!IsInLoopThread() || is_calling_pending_functors_) {
		Wakeup();
	}
}

void netlib::net::EventLoop::DoPendingFunctors() {
	decltype(pending_functors_) tmp_functor;
	is_calling_pending_functors_ = true;
	{
		MutexLockGuard lock(mutex_);
		tmp_functor.swap(pending_functors_);
	}
	for (const Functor& f : tmp_functor) {
		f();
	}
	is_calling_pending_functors_ = false;
}

void netlib::net::EventLoop::Wakeup() {
	// char wake_msg = 1;
	size_t len = write(wakeup_channel_->Fd(), &kWakeupMsg, sizeof(kWakeupMsg));
	if (len != sizeof(kWakeupMsg)) {
		LOG_SYSERR << "EventLoop::wakeup() writes " << len << " bytes instead of " << sizeof(char);
	}
}

void netlib::net::EventLoop::HandleWakeup() {
	int64_t buf{0};
	size_t len = read(wakeup_channel_->Fd(), &buf, sizeof(buf));
	if (len != sizeof(kWakeupMsg)) {
		LOG_ERROR << "EventLoop::handleWakeup() reads " << len << " bytes instead of "
		          << sizeof(kWakeupMsg);
	}
}

netlib::LogStream& netlib::net::operator<<(LogStream& stream, EventLoop* loop) {
	stream << static_cast<void*>(loop);
	return stream;
}